//! The client-side proxy API.

use std::collections::{
    HashMap,
    HashSet,
};
use std::fmt;
use std::future::Future;
use std::ops::Deref;
use std::pin::Pin;
use std::sync::{
    Arc,
    OnceLock,
    RwLock,
    RwLockReadGuard,
};
use std::task::{
    Context,
    Poll,
};

use enumflags2::{
    BitFlags,
    bitflags,
};
use event_listener::{
    Event,
    EventListener,
};
use futures_core::{
    ready,
    stream,
};
use futures_util::future::Either;
use futures_util::stream::Map;
use ordered_stream::{
    FromFuture,
    Join,
    OrderedStream,
    PollResult,
    join as join_streams,
};
use static_assertions::assert_impl_all;
use tracing::{
    Instrument,
    debug,
    info_span,
    instrument,
    trace,
};
use zbus_names::{
    BusName,
    InterfaceName,
    MemberName,
    UniqueName,
};
use zvariant::{
    ObjectPath,
    OwnedValue,
    Str,
    Value,
};

use crate::fdo::{
    self,
    IntrospectableProxy,
    NameOwnerChanged,
    PropertiesChangedStream,
    PropertiesProxy,
};
use crate::message::{
    Flags,
    Message,
    Sequence,
    Type,
};
use crate::{
    AsyncDrop,
    Connection,
    Error,
    Executor,
    MatchRule,
    MessageStream,
    OwnedMatchRule,
    Result,
    Task,
};

mod builder;
pub use builder::{
    Builder,
    CacheProperties,
    ProxyDefault,
};

/// A client-side interface proxy.
///
/// A `Proxy` is a helper to interact with an interface on a remote object.
///
/// # Example
///
/// ```no_run
/// use std::error::Error;
/// use std::result::Result;
///
/// use zbus::{
///     Connection,
///     Proxy,
/// };
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
///     let connection = Connection::session().await?;
///     let p = Proxy::new(
///         &connection,
///         "org.freedesktop.DBus",
///         "/org/freedesktop/DBus",
///         "org.freedesktop.DBus",
///     )
///     .await?;
///     // owned return value
///     let _id: String = p.call("GetId", &()).await?;
///     // borrowed return value
///     let body = p.call_method("GetId", &()).await?.body();
///     let _id: &str = body.deserialize()?;
///
///     Ok(())
/// }
/// ```
///
/// # Note
///
/// It is recommended to use the [`proxy`] macro, which provides a more convenient and
/// type-safe *façade* `Proxy` derived from a Rust trait.
///
/// [`futures` crate]: https://crates.io/crates/futures
/// [`proxy`]: attr.proxy.html
#[derive(Clone, Debug)]
pub struct Proxy<'a> {
    pub(crate) inner: Arc<ProxyInner<'a>>,
}

assert_impl_all!(Proxy<'_>: Send, Sync, Unpin);

/// This is required to avoid having the Drop impl extend the lifetime 'a, which breaks zbus_xmlgen
/// (and possibly other crates).
pub(crate) struct ProxyInnerStatic {
    pub(crate) conn: Connection,
    dest_owner_change_match_rule: OnceLock<OwnedMatchRule>,
}

impl fmt::Debug for ProxyInnerStatic {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("ProxyInnerStatic")
            .field("dest_owner_change_match_rule", &self.dest_owner_change_match_rule)
            .finish_non_exhaustive()
    }
}

#[derive(Debug)]
pub(crate) struct ProxyInner<'a> {
    inner_without_borrows: ProxyInnerStatic,
    pub(crate) destination: BusName<'a>,
    pub(crate) path: ObjectPath<'a>,
    pub(crate) interface: InterfaceName<'a>,

    /// Cache of property values.
    property_cache: Option<OnceLock<(Arc<PropertiesCache>, Task<()>)>>,
    /// Set of properties which do not get cached, by name.
    /// This overrides proxy-level caching behavior.
    uncached_properties: HashSet<Str<'a>>,
}

impl Drop for ProxyInnerStatic {
    fn drop(&mut self) {
        if let Some(rule) = self.dest_owner_change_match_rule.take() {
            self.conn.queue_remove_match(rule);
        }
    }
}

/// A property changed event.
///
/// The property changed event generated by [`PropertyStream`].
pub struct PropertyChanged<'a, T> {
    name: &'a str,
    properties: Arc<PropertiesCache>,
    proxy: Proxy<'a>,
    phantom: std::marker::PhantomData<T>,
}

impl<T> PropertyChanged<'_, T> {
    /// The name of the property that changed.
    pub fn name(&self) -> &str {
        self.name
    }

    /// Get the raw value of the property that changed.
    ///
    /// If the notification signal contained the new value, it has been cached already and this call
    /// will return that value. Otherwise (i.e. invalidated property), a D-Bus call is made to fetch
    /// and cache the new value.
    pub async fn get_raw(&self) -> Result<impl Deref<Target = Value<'static>> + '_> {
        struct Wrapper<'w> {
            name: &'w str,
            values: RwLockReadGuard<'w, HashMap<String, PropertyValue>>,
        }

        impl Deref for Wrapper<'_> {
            type Target = Value<'static>;

            fn deref(&self) -> &Self::Target {
                self.values
                    .get(self.name)
                    .expect("PropertyStream with no corresponding property")
                    .value
                    .as_ref()
                    .expect("PropertyStream with no corresponding property")
            }
        }

        {
            let values = self.properties.values.read().expect("lock poisoned");
            if values
                .get(self.name)
                .expect("PropertyStream with no corresponding property")
                .value
                .is_some()
            {
                return Ok(Wrapper {
                    name: self.name,
                    values,
                });
            }
        }

        // The property was invalidated, so we need to fetch the new value.
        let properties_proxy = self.proxy.properties_proxy();
        let value = properties_proxy
            .get(self.proxy.inner.interface.clone(), self.name)
            .await
            .map_err(crate::Error::from)?;

        // Save the new value
        {
            let mut values = self.properties.values.write().expect("lock poisoned");

            values
                .get_mut(self.name)
                .expect("PropertyStream with no corresponding property")
                .value = Some(value);
        }

        Ok(Wrapper {
            name: self.name,
            values: self.properties.values.read().expect("lock poisoned"),
        })
    }
}

impl<T> PropertyChanged<'_, T>
where
    T: TryFrom<zvariant::OwnedValue>,
    T::Error: Into<crate::Error>,
{
    /// Get the value of the property that changed.
    ///
    /// If the notification signal contained the new value, it has been cached already and this call
    /// will return that value. Otherwise (i.e. invalidated property), a D-Bus call is made to fetch
    /// and cache the new value.
    pub async fn get(&self) -> Result<T> {
        self.get_raw()
            .await
            .and_then(|v| T::try_from(OwnedValue::try_from(&*v)?).map_err(Into::into))
    }
}

/// A [`stream::Stream`] implementation that yields property change notifications.
///
/// Use [`Proxy::receive_property_changed`] to create an instance of this type.
#[derive(Debug)]
pub struct PropertyStream<'a, T> {
    name: &'a str,
    proxy: Proxy<'a>,
    changed_listener: EventListener,
    phantom: std::marker::PhantomData<T>,
}

impl<'a, T> stream::Stream for PropertyStream<'a, T>
where
    T: Unpin,
{
    type Item = PropertyChanged<'a, T>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let m = self.get_mut();
        let properties = match m.proxy.get_property_cache() {
            Some(properties) => properties.clone(),
            // With no cache, we will get no updates; return immediately
            None => return Poll::Ready(None),
        };
        ready!(Pin::new(&mut m.changed_listener).poll(cx));

        m.changed_listener = properties
            .values
            .read()
            .expect("lock poisoned")
            .get(m.name)
            .expect("PropertyStream with no corresponding property")
            .event
            .listen();

        Poll::Ready(Some(PropertyChanged {
            name: m.name,
            properties,
            proxy: m.proxy.clone(),
            phantom: std::marker::PhantomData,
        }))
    }
}

#[derive(Debug)]
pub(crate) struct PropertiesCache {
    values: RwLock<HashMap<String, PropertyValue>>,
    caching_result: RwLock<CachingResult>,
}

#[derive(Debug)]
enum CachingResult {
    Caching { ready: Event },
    Cached { result: Result<()> },
}

impl PropertiesCache {
    #[instrument(skip_all)]
    fn new(
        proxy: PropertiesProxy<'static>,
        interface: InterfaceName<'static>,
        executor: &Executor<'_>,
        uncached_properties: HashSet<zvariant::Str<'static>>,
    ) -> (Arc<Self>, Task<()>) {
        let cache = Arc::new(PropertiesCache {
            values: Default::default(),
            caching_result: RwLock::new(CachingResult::Caching { ready: Event::new() }),
        });

        let cache_clone = cache.clone();
        let task_name = format!("{interface} proxy caching");
        let proxy_caching = async move {
            let result = cache_clone.init(proxy, interface, uncached_properties).await;
            let (prop_changes, interface, uncached_properties) = {
                let mut caching_result = cache_clone.caching_result.write().expect("lock poisoned");
                let ready = match &*caching_result {
                    CachingResult::Caching { ready } => ready,
                    // SAFETY: This is the only part of the code that changes this state and it's
                    // only run once.
                    CachingResult::Cached { .. } => unreachable!(),
                };
                match result {
                    Ok((prop_changes, interface, uncached_properties)) => {
                        ready.notify(usize::MAX);
                        *caching_result = CachingResult::Cached { result: Ok(()) };

                        (prop_changes, interface, uncached_properties)
                    },
                    Err(e) => {
                        ready.notify(usize::MAX);
                        *caching_result = CachingResult::Cached { result: Err(e) };

                        return;
                    },
                }
            };

            if let Err(e) = cache_clone
                .keep_updated(prop_changes, interface, uncached_properties)
                .await
            {
                debug!("Error keeping properties cache updated: {e}");
            }
        }
        .instrument(info_span!("{}", task_name));
        let task = executor.spawn(proxy_caching, &task_name);

        (cache, task)
    }

    /// new() runs this in a task it spawns for initialization of properties cache.
    async fn init(
        &self,
        proxy: PropertiesProxy<'static>,
        interface: InterfaceName<'static>,
        uncached_properties: HashSet<zvariant::Str<'static>>,
    ) -> Result<(
        PropertiesChangedStream<'static>,
        InterfaceName<'static>,
        HashSet<zvariant::Str<'static>>,
    )> {
        use ordered_stream::OrderedStreamExt;

        let prop_changes = proxy.receive_properties_changed().await?.map(Either::Left);

        let get_all = proxy
            .inner()
            .connection()
            .call_method_raw(
                Some(proxy.inner().destination()),
                proxy.inner().path(),
                Some(proxy.inner().interface()),
                "GetAll",
                BitFlags::empty(),
                &interface,
            )
            .await
            .map(|r| FromFuture::from(r.expect("no reply")).map(Either::Right))?;

        let mut join = join_streams(prop_changes, get_all);

        loop {
            match join.next().await {
                Some(Either::Left(_update)) => {
                    // discard updates prior to the initial population
                },
                Some(Either::Right(populate)) => {
                    populate?.body().deserialize().map(|values| {
                        self.update_cache(&uncached_properties, &values, Vec::new(), &interface);
                    })?;
                    break;
                },
                None => break,
            }
        }
        if let Some((Either::Left(update), _)) = Pin::new(&mut join).take_buffered() {
            // if an update was buffered, then it happened after the get_all returned and needs to
            // be applied before we discard the join
            if let Ok(args) = update.args() {
                if args.interface_name == interface {
                    self.update_cache(
                        &uncached_properties,
                        &args.changed_properties,
                        args.invalidated_properties,
                        &interface,
                    );
                }
            }
        }
        // This is needed to avoid a "implementation of `OrderedStream` is not general enough"
        // error that occurs if you apply the map and join to Pin::new(&mut prop_changes) instead
        // of directly to the stream.
        let prop_changes = join.into_inner().0.into_inner();

        Ok((prop_changes, interface, uncached_properties))
    }

    /// new() runs this in a task it spawns for keeping the cache in sync.
    #[instrument(skip_all)]
    async fn keep_updated(
        &self,
        mut prop_changes: PropertiesChangedStream<'static>,
        interface: InterfaceName<'static>,
        uncached_properties: HashSet<zvariant::Str<'static>>,
    ) -> Result<()> {
        use futures_util::StreamExt;

        trace!("Listening for property changes on {interface}...");
        while let Some(update) = prop_changes.next().await {
            if let Ok(args) = update.args() {
                if args.interface_name == interface {
                    self.update_cache(
                        &uncached_properties,
                        &args.changed_properties,
                        args.invalidated_properties,
                        &interface,
                    );
                }
            }
        }

        Ok(())
    }

    fn update_cache(
        &self,
        uncached_properties: &HashSet<Str<'_>>,
        changed: &HashMap<&str, Value<'_>>,
        invalidated: Vec<&str>,
        interface: &InterfaceName<'_>,
    ) {
        let mut values = self.values.write().expect("lock poisoned");

        for inval in invalidated {
            if uncached_properties.contains(&Str::from(inval)) {
                debug!("Ignoring invalidation of uncached property `{}.{}`", interface, inval);
                continue;
            }
            trace!("Property `{interface}.{inval}` invalidated");

            if let Some(entry) = values.get_mut(inval) {
                entry.value = None;
                entry.event.notify(usize::MAX);
            }
        }

        for (property_name, value) in changed {
            if uncached_properties.contains(&Str::from(*property_name)) {
                debug!("Ignoring update of uncached property `{}.{}`", interface, property_name);
                continue;
            }
            trace!("Property `{interface}.{property_name}` updated");

            let entry = values.entry((*property_name).to_string()).or_default();

            let value = match OwnedValue::try_from(value) {
                Ok(value) => value,
                Err(e) => {
                    debug!("Failed to convert property `{interface}.{property_name}` to OwnedValue: {e}");
                    continue;
                },
            };
            entry.value = Some(value);
            entry.event.notify(usize::MAX);
        }
    }

    /// Wait for the cache to be populated and return any error encountered during population.
    pub(crate) async fn ready(&self) -> Result<()> {
        let listener = match &*self.caching_result.read().expect("lock poisoned") {
            CachingResult::Caching { ready } => ready.listen(),
            CachingResult::Cached { result } => return result.clone(),
        };
        listener.await;

        // It must be ready now.
        match &*self.caching_result.read().expect("lock poisoned") {
            // SAFETY: We were just notified that state has changed to `Cached` and we never go back
            // to `Caching` once in `Cached`.
            CachingResult::Caching { .. } => unreachable!(),
            CachingResult::Cached { result } => result.clone(),
        }
    }
}

impl<'a> ProxyInner<'a> {
    pub(crate) fn new(
        conn: Connection,
        destination: BusName<'a>,
        path: ObjectPath<'a>,
        interface: InterfaceName<'a>,
        cache: CacheProperties,
        uncached_properties: HashSet<Str<'a>>,
    ) -> Self {
        let property_cache = match cache {
            CacheProperties::Yes | CacheProperties::Lazily => Some(OnceLock::new()),
            CacheProperties::No => None,
        };
        Self {
            inner_without_borrows: ProxyInnerStatic {
                conn,
                dest_owner_change_match_rule: OnceLock::new(),
            },
            destination,
            path,
            interface,
            property_cache,
            uncached_properties,
        }
    }

    /// Subscribe to the "NameOwnerChanged" signal on the bus for our destination.
    ///
    /// If the destination is a unique name, we will not subscribe to the signal.
    pub(crate) async fn subscribe_dest_owner_change(&self) -> Result<()> {
        if !self.inner_without_borrows.conn.is_bus() {
            // Names don't mean much outside the bus context.
            return Ok(());
        }

        let well_known_name = match &self.destination {
            BusName::WellKnown(well_known_name) => well_known_name,
            BusName::Unique(_) => return Ok(()),
        };

        if self.inner_without_borrows.dest_owner_change_match_rule.get().is_some() {
            // Already watching over the bus for any name updates so nothing to do here.
            return Ok(());
        }

        let conn = &self.inner_without_borrows.conn;
        let signal_rule: OwnedMatchRule = MatchRule::builder()
            .msg_type(Type::Signal)
            .sender("org.freedesktop.DBus")?
            .path("/org/freedesktop/DBus")?
            .interface("org.freedesktop.DBus")?
            .member("NameOwnerChanged")?
            .add_arg(well_known_name.as_str())?
            .build()
            .to_owned()
            .into();

        conn.add_match(signal_rule.clone(), Some(MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED))
            .await?;

        if self
            .inner_without_borrows
            .dest_owner_change_match_rule
            .set(signal_rule.clone())
            .is_err()
        {
            // we raced another destination_unique_name call and added it twice
            conn.remove_match(signal_rule).await?;
        }

        Ok(())
    }
}

const MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED: usize = 8;

impl<'a> Proxy<'a> {
    /// Create a new `Proxy` for the given destination/path/interface.
    pub async fn new<D, P, I>(conn: &Connection, destination: D, path: P, interface: I) -> Result<Proxy<'a>>
    where
        D: TryInto<BusName<'a>>,
        P: TryInto<ObjectPath<'a>>,
        I: TryInto<InterfaceName<'a>>,
        D::Error: Into<Error>,
        P::Error: Into<Error>,
        I::Error: Into<Error>,
    {
        Builder::new(conn)
            .destination(destination)?
            .path(path)?
            .interface(interface)?
            .build()
            .await
    }

    /// Create a new `Proxy` for the given destination/path/interface, taking ownership of all
    /// passed arguments.
    pub async fn new_owned<D, P, I>(conn: Connection, destination: D, path: P, interface: I) -> Result<Proxy<'a>>
    where
        D: TryInto<BusName<'static>>,
        P: TryInto<ObjectPath<'static>>,
        I: TryInto<InterfaceName<'static>>,
        D::Error: Into<Error>,
        P::Error: Into<Error>,
        I::Error: Into<Error>,
    {
        Builder::new(&conn)
            .destination(destination)?
            .path(path)?
            .interface(interface)?
            .build()
            .await
    }

    /// Get a reference to the associated connection.
    pub fn connection(&self) -> &Connection {
        &self.inner.inner_without_borrows.conn
    }

    /// Get a reference to the destination service name.
    pub fn destination(&self) -> &BusName<'_> {
        &self.inner.destination
    }

    /// Get a reference to the object path.
    pub fn path(&self) -> &ObjectPath<'_> {
        &self.inner.path
    }

    /// Get a reference to the interface.
    pub fn interface(&self) -> &InterfaceName<'_> {
        &self.inner.interface
    }

    /// Introspect the associated object, and return the XML description.
    ///
    /// See the [xml](xml/index.html) module for parsing the
    /// result.
    pub async fn introspect(&self) -> fdo::Result<String> {
        let proxy = IntrospectableProxy::builder(&self.inner.inner_without_borrows.conn)
            .destination(&self.inner.destination)?
            .path(&self.inner.path)?
            .build()
            .await?;

        proxy.introspect().await
    }

    fn properties_proxy(&self) -> PropertiesProxy<'_> {
        PropertiesProxy::builder(&self.inner.inner_without_borrows.conn)
            // Safe because already checked earlier
            .destination(self.inner.destination.as_ref())
            .unwrap()
            // Safe because already checked earlier
            .path(self.inner.path.as_ref())
            .unwrap()
            // does not have properties
            .cache_properties(CacheProperties::No)
            .build_internal()
            .unwrap()
            .into()
    }

    fn owned_properties_proxy(&self) -> PropertiesProxy<'static> {
        PropertiesProxy::builder(&self.inner.inner_without_borrows.conn)
            // Safe because already checked earlier
            .destination(self.inner.destination.to_owned())
            .unwrap()
            // Safe because already checked earlier
            .path(self.inner.path.to_owned())
            .unwrap()
            // does not have properties
            .cache_properties(CacheProperties::No)
            .build_internal()
            .unwrap()
            .into()
    }

    /// Get the cache, starting it in the background if needed.
    ///
    /// Use PropertiesCache::ready() to wait for the cache to be populated and to get any errors
    /// encountered in the population.
    pub(crate) fn get_property_cache(&self) -> Option<&Arc<PropertiesCache>> {
        let cache = match &self.inner.property_cache {
            Some(cache) => cache,
            None => return None,
        };
        let (cache, _) = &cache.get_or_init(|| {
            let proxy = self.owned_properties_proxy();
            let interface = self.interface().to_owned();
            let uncached_properties: HashSet<zvariant::Str<'static>> =
                self.inner.uncached_properties.iter().map(|s| s.to_owned()).collect();
            let executor = self.connection().executor();

            PropertiesCache::new(proxy, interface, executor, uncached_properties)
        });

        Some(cache)
    }

    /// Get the cached value of the property `property_name`.
    ///
    /// This returns `None` if the property is not in the cache.  This could be because the cache
    /// was invalidated by an update, because caching was disabled for this property or proxy, or
    /// because the cache has not yet been populated.  Use `get_property` to fetch the value from
    /// the peer.
    pub fn cached_property<T>(&self, property_name: &str) -> Result<Option<T>>
    where
        T: TryFrom<OwnedValue>,
        T::Error: Into<Error>,
    {
        self.cached_property_raw(property_name)
            .as_deref()
            .map(|v| T::try_from(OwnedValue::try_from(v)?).map_err(Into::into))
            .transpose()
    }

    /// Get the cached value of the property `property_name`.
    ///
    /// Same as `cached_property`, but gives you access to the raw value stored in the cache. This
    /// is useful if you want to avoid allocations and cloning.
    pub fn cached_property_raw<'p>(
        &'p self,
        property_name: &'p str,
    ) -> Option<impl Deref<Target = Value<'static>> + 'p> {
        if let Some(values) = self
            .inner
            .property_cache
            .as_ref()
            .and_then(OnceLock::get)
            .map(|c| c.0.values.read().expect("lock poisoned"))
        {
            // ensure that the property is in the cache.
            values
                .get(property_name)
                // if the property value has not yet been cached, this will return None.
                .and_then(|e| e.value.as_ref())?;

            struct Wrapper<'a> {
                values: RwLockReadGuard<'a, HashMap<String, PropertyValue>>,
                property_name: &'a str,
            }

            impl Deref for Wrapper<'_> {
                type Target = Value<'static>;

                fn deref(&self) -> &Self::Target {
                    self.values
                        .get(self.property_name)
                        .and_then(|e| e.value.as_ref())
                        .map(|v| &**v)
                        .expect("inexistent property")
                }
            }

            Some(Wrapper { values, property_name })
        } else {
            None
        }
    }

    async fn get_proxy_property(&self, property_name: &str) -> Result<OwnedValue> {
        Ok(self
            .properties_proxy()
            .get(self.inner.interface.as_ref(), property_name)
            .await?)
    }

    /// Get the property `property_name`.
    ///
    /// Get the property value from the cache (if caching is enabled) or call the
    /// `Get` method of the `org.freedesktop.DBus.Properties` interface.
    pub async fn get_property<T>(&self, property_name: &str) -> Result<T>
    where
        T: TryFrom<OwnedValue>,
        T::Error: Into<Error>,
    {
        if let Some(cache) = self.get_property_cache() {
            cache.ready().await?;
        }
        if let Some(value) = self.cached_property(property_name)? {
            return Ok(value);
        }

        let value = self.get_proxy_property(property_name).await?;
        value.try_into().map_err(Into::into)
    }

    /// Set the property `property_name`.
    ///
    /// Effectively, call the `Set` method of the `org.freedesktop.DBus.Properties` interface.
    pub async fn set_property<'t, T>(&self, property_name: &str, value: T) -> fdo::Result<()>
    where
        T: 't + Into<Value<'t>>,
    {
        self.properties_proxy()
            .set(self.inner.interface.as_ref(), property_name, &value.into())
            .await
    }

    /// Call a method and return the reply.
    ///
    /// Typically, you would want to use [`call`] method instead. Use this method if you need to
    /// deserialize the reply message manually (this way, you can avoid the memory
    /// allocation/copying, by deserializing the reply to an unowned type).
    ///
    /// [`call`]: struct.Proxy.html#method.call
    pub async fn call_method<'m, M, B>(&self, method_name: M, body: &B) -> Result<Message>
    where
        M: TryInto<MemberName<'m>>,
        M::Error: Into<Error>,
        B: serde::ser::Serialize + zvariant::DynamicType,
    {
        self.inner
            .inner_without_borrows
            .conn
            .call_method(
                Some(&self.inner.destination),
                self.inner.path.as_str(),
                Some(&self.inner.interface),
                method_name,
                body,
            )
            .await
    }

    /// Call a method and return the reply body.
    ///
    /// Use [`call_method`] instead if you need to deserialize the reply manually/separately.
    ///
    /// [`call_method`]: struct.Proxy.html#method.call_method
    pub async fn call<'m, M, B, R>(&self, method_name: M, body: &B) -> Result<R>
    where
        M: TryInto<MemberName<'m>>,
        M::Error: Into<Error>,
        B: serde::ser::Serialize + zvariant::DynamicType,
        R: for<'d> zvariant::DynamicDeserialize<'d>,
    {
        let reply = self.call_method(method_name, body).await?;

        reply.body().deserialize()
    }

    /// Call a method and return the reply body, optionally supplying a set of
    /// method flags to control the way the method call message is sent and handled.
    ///
    /// Use [`call`] instead if you do not need any special handling via additional flags.
    /// If the `NoReplyExpected` flag is passed, this will return None immediately
    /// after sending the message, similar to [`call_noreply`].
    ///
    /// [`call`]: struct.Proxy.html#method.call
    /// [`call_noreply`]: struct.Proxy.html#method.call_noreply
    pub async fn call_with_flags<'m, M, B, R>(
        &self,
        method_name: M,
        flags: BitFlags<MethodFlags>,
        body: &B,
    ) -> Result<Option<R>>
    where
        M: TryInto<MemberName<'m>>,
        M::Error: Into<Error>,
        B: serde::ser::Serialize + zvariant::DynamicType,
        R: for<'d> zvariant::DynamicDeserialize<'d>,
    {
        let flags = flags.iter().map(Flags::from).collect::<BitFlags<_>>();
        match self
            .inner
            .inner_without_borrows
            .conn
            .call_method_raw(
                Some(self.destination()),
                self.path(),
                Some(self.interface()),
                method_name,
                flags,
                body,
            )
            .await?
        {
            Some(reply) => reply.await?.body().deserialize().map(Some),
            None => Ok(None),
        }
    }

    /// Call a method without expecting a reply.
    ///
    /// This sets the `NoReplyExpected` flag on the calling message and does not wait for a reply.
    pub async fn call_noreply<'m, M, B>(&self, method_name: M, body: &B) -> Result<()>
    where
        M: TryInto<MemberName<'m>>,
        M::Error: Into<Error>,
        B: serde::ser::Serialize + zvariant::DynamicType,
    {
        self.call_with_flags::<_, _, ()>(method_name, MethodFlags::NoReplyExpected.into(), body)
            .await?;
        Ok(())
    }

    /// Create a stream for the signal named `signal_name`.
    ///
    /// # Errors
    ///
    /// Apart from general I/O errors that can result from socket communications, calling this
    /// method will also result in an error if the destination service has not yet registered its
    /// well-known name with the bus (assuming you're using the well-known name as destination).
    pub async fn receive_signal<'m, M>(&self, signal_name: M) -> Result<SignalStream<'m>>
    where
        M: TryInto<MemberName<'m>>,
        M::Error: Into<Error>,
    {
        self.receive_signal_with_args(signal_name, &[]).await
    }

    /// Same as [`Proxy::receive_signal`] but with a filter.
    ///
    /// The D-Bus specification allows you to filter signals by their arguments, which helps avoid
    /// a lot of unnecessary traffic and processing since the filter is run on the server side. Use
    /// this method where possible. Note that this filtering is limited to arguments of string
    /// types.
    ///
    /// The arguments are passed as tuples of argument index and expected value.
    pub async fn receive_signal_with_args<'m, M>(&self, signal_name: M, args: &[(u8, &str)]) -> Result<SignalStream<'m>>
    where
        M: TryInto<MemberName<'m>>,
        M::Error: Into<Error>,
    {
        let signal_name = signal_name.try_into().map_err(Into::into)?;
        self.receive_signals(Some(signal_name), args).await
    }

    async fn receive_signals<'m>(
        &self,
        signal_name: Option<MemberName<'m>>,
        args: &[(u8, &str)],
    ) -> Result<SignalStream<'m>> {
        self.inner.subscribe_dest_owner_change().await?;

        SignalStream::new(self.clone(), signal_name, args).await
    }

    /// Create a stream for all signals emitted by this service.
    pub async fn receive_all_signals(&self) -> Result<SignalStream<'static>> {
        self.receive_signals(None, &[]).await
    }

    /// Get a stream to receive property changed events.
    ///
    /// Note that zbus doesn't queue the updates. If the listener is slower than the receiver, it
    /// will only receive the last update.
    ///
    /// If caching is not enabled on this proxy, the resulting stream will not return any events.
    pub async fn receive_property_changed<'name: 'a, T>(&self, name: &'name str) -> PropertyStream<'a, T> {
        let properties = self.get_property_cache();
        let changed_listener = if let Some(properties) = &properties {
            let mut values = properties.values.write().expect("lock poisoned");
            let entry = values.entry(name.to_string()).or_insert_with(PropertyValue::default);
            entry.event.listen()
        } else {
            Event::new().listen()
        };

        PropertyStream {
            name,
            proxy: self.clone(),
            changed_listener,
            phantom: std::marker::PhantomData,
        }
    }

    /// Get a stream to receive destination owner changed events.
    ///
    /// If the proxy destination is a unique name, the stream will be notified of the peer
    /// disconnection from the bus (with a `None` value).
    ///
    /// If the proxy destination is a well-known name, the stream will be notified whenever the name
    /// owner is changed, either by a new peer being granted ownership (`Some` value) or when the
    /// name is released (with a `None` value).
    ///
    /// Note that zbus doesn't queue the updates. If the listener is slower than the receiver, it
    /// will only receive the last update.
    pub async fn receive_owner_changed(&self) -> Result<OwnerChangedStream<'_>> {
        use futures_util::StreamExt;
        let dbus_proxy = fdo::DBusProxy::builder(self.connection())
            .cache_properties(CacheProperties::No)
            .build()
            .await?;
        Ok(OwnerChangedStream {
            stream: dbus_proxy
                .receive_name_owner_changed_with_args(&[(0, self.destination().as_str())])
                .await?
                .map(Box::new(move |signal| {
                    let args = signal.args().unwrap();
                    let new_owner = args.new_owner().as_ref().map(|owner| owner.to_owned());

                    new_owner
                })),
            name: self.destination().clone(),
        })
    }
}

#[derive(Debug, Default)]
struct PropertyValue {
    value: Option<OwnedValue>,
    event: Event,
}

/// Flags to use with [`Proxy::call_with_flags`].
#[bitflags]
#[repr(u8)]
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum MethodFlags {
    /// No response is expected from this method call, regardless of whether the
    /// signature for the interface method indicates a reply type. When passed,
    /// `call_with_flags` will return `Ok(None)` immediately after successfully
    /// sending the method call.
    ///
    /// Errors encountered while *making* the call will still be returned as
    /// an `Err` variant, but any errors that are triggered by the receiver's
    /// handling of the call will not be delivered.
    NoReplyExpected      = 0x1,

    /// When set on a call whose destination is a message bus, this flag will instruct
    /// the bus not to [launch][al] a service to handle the call if no application
    /// on the bus owns the requested name.
    ///
    /// This flag is ignored when using a peer-to-peer connection.
    ///
    /// [al]: https://dbus.freedesktop.org/doc/dbus-specification.html#message-bus-starting-services
    NoAutoStart          = 0x2,

    /// Indicates to the receiver that this client is prepared to wait for interactive
    /// authorization, which might take a considerable time to complete. For example, the receiver
    /// may query the user for confirmation via [polkit] or a similar framework.
    ///
    /// [polkit]: https://gitlab.freedesktop.org/polkit/polkit/
    AllowInteractiveAuth = 0x4,
}

assert_impl_all!(MethodFlags: Send, Sync, Unpin);

impl From<MethodFlags> for Flags {
    fn from(method_flag: MethodFlags) -> Self {
        match method_flag {
            MethodFlags::NoReplyExpected => Self::NoReplyExpected,
            MethodFlags::NoAutoStart => Self::NoAutoStart,
            MethodFlags::AllowInteractiveAuth => Self::AllowInteractiveAuth,
        }
    }
}

type OwnerChangedStreamMap<'a> = Map<
    fdo::NameOwnerChangedStream<'a>,
    Box<dyn FnMut(fdo::NameOwnerChanged) -> Option<UniqueName<'static>> + Send + Sync + Unpin>,
>;

/// A [`stream::Stream`] implementation that yields `UniqueName` when the bus owner changes.
///
/// Use [`Proxy::receive_owner_changed`] to create an instance of this type.
pub struct OwnerChangedStream<'a> {
    stream: OwnerChangedStreamMap<'a>,
    name: BusName<'a>,
}

assert_impl_all!(OwnerChangedStream<'_>: Send, Sync, Unpin);

impl OwnerChangedStream<'_> {
    /// The bus name being tracked.
    pub fn name(&self) -> &BusName<'_> {
        &self.name
    }
}

impl stream::Stream for OwnerChangedStream<'_> {
    type Item = Option<UniqueName<'static>>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        use futures_util::StreamExt;
        self.get_mut().stream.poll_next_unpin(cx)
    }
}

/// A [`stream::Stream`] implementation that yields signal [messages](`Message`).
///
/// Use [`Proxy::receive_signal`] to create an instance of this type.
///
/// This type uses a [`MessageStream::for_match_rule`] internally and therefore the note about match
/// rule registration and [`AsyncDrop`] in its documentation applies here as well.
#[derive(Debug)]
pub struct SignalStream<'a> {
    stream: Join<MessageStream, Option<MessageStream>>,
    src_unique_name: Option<UniqueName<'static>>,
    signal_name: Option<MemberName<'a>>,
}

impl<'a> SignalStream<'a> {
    /// The signal name.
    pub fn name(&self) -> Option<&MemberName<'a>> {
        self.signal_name.as_ref()
    }

    async fn new(
        proxy: Proxy<'_>,
        signal_name: Option<MemberName<'a>>,
        args: &[(u8, &str)],
    ) -> Result<SignalStream<'a>> {
        let mut rule_builder = MatchRule::builder()
            .msg_type(Type::Signal)
            .sender(proxy.destination())?
            .path(proxy.path())?
            .interface(proxy.interface())?;
        if let Some(name) = &signal_name {
            rule_builder = rule_builder.member(name)?;
        }
        for (i, arg) in args {
            rule_builder = rule_builder.arg(*i, *arg)?;
        }
        let signal_rule: OwnedMatchRule = rule_builder.build().to_owned().into();
        let conn = proxy.connection();

        let (src_unique_name, stream) = match proxy.destination().to_owned() {
            BusName::Unique(name) => (
                Some(name),
                join_streams(MessageStream::for_match_rule(signal_rule, conn, None).await?, None),
            ),
            BusName::WellKnown(name) => {
                use ordered_stream::OrderedStreamExt;

                let name_owner_changed_rule = MatchRule::builder()
                    .msg_type(Type::Signal)
                    .sender("org.freedesktop.DBus")?
                    .path("/org/freedesktop/DBus")?
                    .interface("org.freedesktop.DBus")?
                    .member("NameOwnerChanged")?
                    .add_arg(name.as_str())?
                    .build();
                let name_owner_changed_stream = MessageStream::for_match_rule(
                    name_owner_changed_rule,
                    conn,
                    Some(MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED),
                )
                .await?
                .map(Either::Left);

                let get_name_owner = conn
                    .call_method_raw(
                        Some("org.freedesktop.DBus"),
                        "/org/freedesktop/DBus",
                        Some("org.freedesktop.DBus"),
                        "GetNameOwner",
                        BitFlags::empty(),
                        &name,
                    )
                    .await
                    .map(|r| FromFuture::from(r.expect("no reply")).map(Either::Right))?;

                let mut join = join_streams(name_owner_changed_stream, get_name_owner);

                let mut src_unique_name = loop {
                    match join.next().await {
                        Some(Either::Left(Ok(msg))) => {
                            let signal = NameOwnerChanged::from_message(msg)
                                .expect("`NameOwnerChanged` signal stream got wrong message");
                            {
                                break signal
                                    .args()
                                    // SAFETY: The filtering code couldn't have let this through if
                                    // args were not in order.
                                    .expect("`NameOwnerChanged` signal has no args")
                                    .new_owner()
                                    .as_ref()
                                    .map(UniqueName::to_owned);
                            }
                        },
                        Some(Either::Left(Err(_))) => (),
                        Some(Either::Right(Ok(response))) => {
                            break Some(response.body().deserialize::<UniqueName<'_>>()?.to_owned());
                        },
                        Some(Either::Right(Err(e))) => {
                            // Probably the name is not owned. Not a problem but let's still log it.
                            debug!("Failed to get owner of {name}: {e}");

                            break None;
                        },
                        None => {
                            return Err(Error::InputOutput(
                                std::io::Error::new(std::io::ErrorKind::BrokenPipe, "connection closed").into(),
                            ));
                        },
                    }
                };

                // Let's take into account any buffered NameOwnerChanged signal.
                let (stream, _, queued) = join.into_inner();
                if let Some(msg) = queued.and_then(|e| match e.0 {
                    Either::Left(Ok(msg)) => Some(msg),
                    Either::Left(Err(_)) | Either::Right(_) => None,
                }) {
                    if let Some(signal) = NameOwnerChanged::from_message(msg) {
                        if let Ok(args) = signal.args() {
                            match (args.name(), &**args.new_owner()) {
                                (BusName::WellKnown(n), Some(new_owner)) if n == &name => {
                                    src_unique_name = Some(new_owner.to_owned());
                                },
                                _ => (),
                            }
                        }
                    }
                }
                let name_owner_changed_stream = stream.into_inner();

                let stream = join_streams(
                    MessageStream::for_match_rule(signal_rule, conn, None).await?,
                    Some(name_owner_changed_stream),
                );

                (src_unique_name, stream)
            },
        };

        Ok(SignalStream {
            stream,
            src_unique_name,
            signal_name,
        })
    }

    fn filter(&mut self, msg: &Message) -> Result<bool> {
        let header = msg.header();
        let sender = header.sender();
        if sender == self.src_unique_name.as_ref() {
            return Ok(true);
        }

        // The src_unique_name must be maintained in lock-step with the applied filter
        if let Some(signal) = NameOwnerChanged::from_message(msg.clone()) {
            let args = signal.args()?;
            self.src_unique_name = args.new_owner().as_ref().map(|n| n.to_owned());
        }

        Ok(false)
    }
}

assert_impl_all!(SignalStream<'_>: Send, Sync, Unpin);

impl stream::Stream for SignalStream<'_> {
    type Item = Message;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        OrderedStream::poll_next_before(self, cx, None).map(|res| res.into_data())
    }
}

impl OrderedStream for SignalStream<'_> {
    type Data = Message;
    type Ordering = Sequence;

    fn poll_next_before(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        before: Option<&Self::Ordering>,
    ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
        let this = self.get_mut();
        loop {
            match ready!(OrderedStream::poll_next_before(Pin::new(&mut this.stream), cx, before)) {
                PollResult::Item { data, ordering } => {
                    if let Ok(msg) = data {
                        if let Ok(true) = this.filter(&msg) {
                            return Poll::Ready(PollResult::Item { data: msg, ordering });
                        }
                    }
                },
                PollResult::Terminated => return Poll::Ready(PollResult::Terminated),
                PollResult::NoneBefore => return Poll::Ready(PollResult::NoneBefore),
            }
        }
    }
}

impl stream::FusedStream for SignalStream<'_> {
    fn is_terminated(&self) -> bool {
        ordered_stream::FusedOrderedStream::is_terminated(&self.stream)
    }
}

#[async_trait::async_trait]
impl AsyncDrop for SignalStream<'_> {
    async fn async_drop(self) {
        let (signals, names, _buffered) = self.stream.into_inner();
        signals.async_drop().await;
        if let Some(names) = names {
            names.async_drop().await;
        }
    }
}

impl<'a> From<crate::blocking::Proxy<'a>> for Proxy<'a> {
    fn from(proxy: crate::blocking::Proxy<'a>) -> Self {
        proxy.into_inner()
    }
}

/// This trait is implemented by all async proxies, which are generated with the
/// [`dbus_proxy`](zbus::dbus_proxy) macro.
pub trait ProxyImpl<'c>
where
    Self: Sized,
{
    /// Return a customizable builder for this proxy.
    fn builder(conn: &Connection) -> Builder<'c, Self>;

    /// Consume `self`, returning the underlying `zbus::Proxy`.
    fn into_inner(self) -> Proxy<'c>;

    /// The reference to the underlying `zbus::Proxy`.
    fn inner(&self) -> &Proxy<'c>;
}

#[cfg(test)]
mod tests {
    use futures_util::StreamExt;
    use ntest::timeout;
    use test_log::test;

    use super::*;
    use crate::object_server::SignalContext;
    use crate::utils::block_on;
    use crate::{
        connection,
        interface,
        proxy,
    };

    #[ignore = "fails in ci"]
    #[test]
    #[timeout(15000)]
    fn signal() {
        block_on(test_signal()).unwrap();
    }

    async fn test_signal() -> Result<()> {
        // Register a well-known name with the session bus and ensure we get the appropriate
        // signals called for that.
        let conn = Connection::session().await?;
        let dest_conn = Connection::session().await?;
        let unique_name = dest_conn.unique_name().unwrap().clone();

        let well_known = "org.freedesktop.zbus.async.ProxySignalStreamTest";
        let proxy: Proxy<'_> = Builder::new(&conn)
            .destination(well_known)?
            .path("/does/not/matter")?
            .interface("does.not.matter")?
            .build()
            .await?;
        let mut owner_changed_stream = proxy.receive_owner_changed().await?;

        let proxy = fdo::DBusProxy::new(&dest_conn).await?;
        let mut name_acquired_stream = proxy
            .inner()
            .receive_signal_with_args("NameAcquired", &[(0, well_known)])
            .await?;

        let prop_stream = proxy
            .inner()
            .receive_property_changed("SomeProp")
            .await
            .filter_map(|changed| async move {
                let v: Option<u32> = changed.get().await.ok();
                v
            });
        drop(proxy);
        drop(prop_stream);

        dest_conn.request_name(well_known).await?;

        let (new_owner, acquired_signal) =
            futures_util::join!(owner_changed_stream.next(), name_acquired_stream.next(),);

        assert_eq!(&new_owner.unwrap().unwrap(), &*unique_name);

        let acquired_signal = acquired_signal.unwrap();
        assert_eq!(acquired_signal.body().deserialize::<&str>().unwrap(), well_known);

        let proxy = Proxy::new(&conn, &unique_name, "/does/not/matter", "does.not.matter").await?;
        let mut unique_name_changed_stream = proxy.receive_owner_changed().await?;

        drop(dest_conn);
        name_acquired_stream.async_drop().await;

        // There shouldn't be an owner anymore.
        let new_owner = owner_changed_stream.next().await;
        assert!(new_owner.unwrap().is_none());

        let new_unique_owner = unique_name_changed_stream.next().await;
        assert!(new_unique_owner.unwrap().is_none());

        Ok(())
    }

    #[ignore = "fails in ci"]
    #[test]
    #[timeout(15000)]
    fn signal_stream_deadlock() {
        block_on(test_signal_stream_deadlock()).unwrap();
    }

    /// Tests deadlocking in signal reception when the message queue is full.
    ///
    /// Creates a connection with a small message queue, and a service that
    /// emits signals at a high rate. First a listener is created that listens
    /// for that signal which should fill the small queue. Then another signal
    /// signal listener is created against another signal. Previously, this second
    /// call to add the match rule never resolved and resulted in a deadlock.
    async fn test_signal_stream_deadlock() -> Result<()> {
        #[proxy(
            gen_blocking = false,
            default_path = "/org/zbus/Test",
            default_service = "org.zbus.Test.MR501",
            interface = "org.zbus.Test"
        )]
        trait Test {
            #[zbus(signal)]
            fn my_signal(&self, msg: &str) -> Result<()>;
        }

        struct TestIface;

        #[interface(name = "org.zbus.Test")]
        impl TestIface {
            #[zbus(signal)]
            async fn my_signal(context: &SignalContext<'_>, msg: &'static str) -> Result<()>;
        }

        let test_iface = TestIface;
        let server_conn = connection::Builder::session()?
            .name("org.zbus.Test.MR501")?
            .serve_at("/org/zbus/Test", test_iface)?
            .build()
            .await?;

        let client_conn = connection::Builder::session()?.max_queued(1).build().await?;

        let test_proxy = TestProxy::new(&client_conn).await?;
        let test_prop_proxy = PropertiesProxy::builder(&client_conn)
            .destination("org.zbus.Test.MR501")?
            .path("/org/zbus/Test")?
            .build()
            .await?;

        let (tx, mut rx) = tokio::sync::mpsc::channel(1);

        let handle = {
            let tx = tx.clone();
            let conn = server_conn.clone();
            let server_fut = async move {
                use std::time::Duration;

                #[cfg(not(feature = "tokio"))]
                use async_io::Timer;
                #[cfg(feature = "tokio")]
                use tokio::time::sleep;

                let iface_ref = conn
                    .object_server()
                    .interface::<_, TestIface>("/org/zbus/Test")
                    .await
                    .unwrap();

                let context = iface_ref.signal_context();
                while !tx.is_closed() {
                    for _ in 0..10 {
                        TestIface::my_signal(context, "This is a test").await.unwrap();
                    }

                    #[cfg(not(feature = "tokio"))]
                    Timer::after(Duration::from_millis(5)).await;

                    #[cfg(feature = "tokio")]
                    sleep(Duration::from_millis(5)).await;
                }
            };
            server_conn.executor().spawn(server_fut, "server_task")
        };

        let signal_fut = async {
            let mut signal_stream = test_proxy.receive_my_signal().await.unwrap();

            tx.send(()).await.unwrap();

            while let Some(_signal) = signal_stream.next().await {}
        };

        let prop_fut = async move {
            rx.recv().await.unwrap();
            let _prop_stream = test_prop_proxy.receive_properties_changed().await.unwrap();
        };

        futures_util::pin_mut!(signal_fut);
        futures_util::pin_mut!(prop_fut);

        futures_util::future::select(signal_fut, prop_fut).await;

        handle.await;

        Ok(())
    }
}
